To launch a notebook with pyspark (in local mode), run the following command in your shell:
IPYTHON_OPTS="notebook" pyspark --master local[*]
This notebook aims at demonstrating the similarities and main differencies between two powerful Machine Learning libraries: scikit-learn and Sparl ML.
The main objective behind this is to show the simplicity of moving from scikit-learn to Spark ML when working on a bigger range of data to train and use Machine Learning workflows.
As we will see, Spark ML is mainly inspired from scikit-learn's structure, so the scikit-learn user will easily be able to use Spark ML API when working on Big Data workflows is needed.
In order to explain and present the main concepts behind both libraries, we will go through a complete example to build an entire Machine Learning workflow, and present the code for both scikit-learn and Spark ML at every step.
We will work on the dataset 20 NewsGroup, which gathers comments about news documents, grouped in several topics (politics, sports, science, etc.). This example is drawn from one of the scikit-learn's tutorial on text data: http://scikit-learn.org/stable/tutorial/text_analytics/working_with_text_data.html#loading-the-20-newsgroups-dataset.
In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
Let's start with a very simple example to compare the use of scikit-learn and Spark ML. There is the same notion of Estimator/Transformer, and the way to use them is also the same. Two main differences though:
In [2]:
import pandas as pd
from sklearn.datasets import load_iris
data = pd.DataFrame(data=load_iris().data, columns=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'])
In [3]:
data.head()
Out[3]:
In [5]:
from sklearn.preprocessing import Binarizer
binarizer = Binarizer(threshold=5)
binarizer.fit_transform(data.sepal_length.reshape(-1, 1))
Out[5]:
In [6]:
df = sqlContext.createDataFrame(data)
In [7]:
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=5.0, inputCol='sepal_length', outputCol='sepal_length_bin')
binarizer.transform(df).show(5)
Let's now work on the 20 NewsGroup dataset and prepare the data in both libraries.
There is a scikit-learn loader for this dataset. We will convert the data and the target to pandas DataFrames.
In [9]:
# Import data
from sklearn.datasets import fetch_20newsgroups
categories = ['rec.autos', 'rec.sport.baseball', 'comp.graphics', 'comp.sys.mac.hardware',
'sci.space', 'sci.crypt', 'talk.politics.guns', 'talk.religion.misc']
newsgroup = fetch_20newsgroups(subset='train', categories=categories, shuffle=True, random_state=42)
print newsgroup.data[0]
# Create pandas DataFrames for values and targets
import pandas as pd
pdf_newsgroup = pd.DataFrame(data=newsgroup.data, columns=['news']) # Texts
pdf_newsgroup_target = pd.DataFrame(data=newsgroup.target, columns=['target']) # Targets
In Spark ML, one often gathers all the information (data and targets) into the same DataFrame. We will therefore create a unique Spark DataFrame from concatenation of the two previous pandas DataFrames.
In [10]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df_newsgroup = sqlContext.createDataFrame(pd.concat([pdf_newsgroup, pdf_newsgroup_target], axis=1))
df_newsgroup.printSchema()
df_newsgroup.show(3)
Train-Test split is a common operation in Machine Learning. It means that we hold on some of the available data in a test set and do as if it were new data. The Machine Learning algorithm will be train on the remaining training set, and the test set will be used to compare the predictions made on it to the ground truth, in order to measure the generalization capacity of the algorithm (the ability to adapt to new data and not only to the data used for training)
In scikit-learn, a Train-Test split is simply done with the function train_test_split from the cross_validation package.
In [11]:
from sklearn.cross_validation import train_test_split
X_train, X_test, y_train, y_test = train_test_split(newsgroup.data, newsgroup.target, train_size=0.8, random_state=42)
In Spark SQL, a more general function named randomSplit allows to split randomly any DataFrame given the proportions wanted. No need to separate the data from the target, both are kept in a same DataFrame.
In [12]:
(df_train, df_test) = df_newsgroup.randomSplit([0.8, 0.2])
Feature Engineering represents all the actions done on the data to transform, extract and select features in order to collect the maximum amount of information on the data to optimize Machine Learning algorithms' performances.
Since the algorithms mostly take as entry numerical data, we need in our case to extract knowledge from the text data and convert it into numerical features. Here are the transformations we are going to perform:
In both scikit-learn and Spark ML, there are objects to perform these transformations.
In both cases, the way to use these objects are much alike: they all have fit() and transform() methods.
NB: The objects used are not exactly the same, and do not have the same default parameters, so the results will be different. The purpose here is to show how to use Spark ML and to see how it looks like scikit-learn.
In [13]:
# Tokenizing and Occurrence Counts
from sklearn.feature_extraction.text import CountVectorizer
count_vect = CountVectorizer()
X_train_counts = count_vect.fit_transform(X_train)
# TF-IDF
from sklearn.feature_extraction.text import TfidfTransformer
tfidf_transformer = TfidfTransformer()
X_train_tfidf = tfidf_transformer.fit_transform(X_train_counts)
In [14]:
# Tokenizing
from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol='news', outputCol='news_words')
df_train_words = tokenizer.transform(df_train)
# Hashing Term-Frequency
from pyspark.ml.feature import HashingTF
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='news_tf', numFeatures=10000)
df_train_tf = hashing_tf.transform(df_train_words)
# Inverse Document Frequency
from pyspark.ml.feature import IDF
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="news_tfidf")
idf_model = idf.fit(df_train_tf) # fit to build the model on all the data, and then apply it line by line
df_train_tfidf = idf_model.transform(df_train_tf)
In [15]:
df_train_tfidf.show(5)
Now that the data is ready to be used, we can start the modelling step. For this example, we will use a simple algorithm: a Decision Tree. Both scikit-learn and Spark ML have a DecisionTreeClassifier object for this.
The parameters to specify to this classifier are the same in both libraries, but with slightly different names. The way to use them is exactly the same.
One slightly difference though. In Spark ML, we need to specify that the target column is categorical, even if we use a Classifier. This is because the classifier in Spark ML needs to know the number of classes. One way to do this is to use a StringIndexer that will convert the column into a double column with the number of classes in its metadata.
If you don't do this, you will get an error like: "DecisionTreeClassifier was given input with invalid label column target, without the number of classes specified. See StringIndexer."
One last important note: Always perform the learning task on the training set, and the predictions on the test set. The test set needs to be transformed as the training set before it can be used by the model to make predictions.
In [16]:
# Training a Decision Tree on training set
from sklearn.tree import DecisionTreeClassifier
clf = DecisionTreeClassifier(max_depth=10).fit(X_train_tfidf, y_train)
# Transform test set
X_test_counts = count_vect.transform(X_test)
X_test_tfidf = tfidf_transformer.transform(X_test_counts)
# Predictions on the test set
y_test_pred = clf.predict(X_test_tfidf)
In [17]:
# Indexing the target
from pyspark.ml.feature import StringIndexer
string_indexer = StringIndexer(inputCol='target', outputCol='target_indexed')
string_indexer_model = string_indexer.fit(df_train_tfidf)
df_train_final = string_indexer_model.transform(df_train_tfidf)
In [18]:
# Training a Decision Tree on training set
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol())
dt_model = dt.fit(df_train_final)
# Transform the test set
df_test_words = tokenizer.transform(df_test)
df_test_tf = hashing_tf.transform(df_test_words)
df_test_tfidf = idf_model.transform(df_test_tf)
df_test_final = string_indexer_model.transform(df_test_tfidf)
# Preditions on the test set
df_test_pred = dt_model.transform(df_test_final)
In [19]:
df_test_pred.select('news', 'target', 'prediction', 'probability').show(5)
As we can see, the number of steps to perform can be quite important, especially for the Feature Engineering part. Chaining all the required steps on the training set to train a model, and then perform them all again on the test set to make predictions can be quite long.
The Pipeline object is here to make our lives easier on this point. It will gather into the same estimator all the steps to perform to transform the data, which will be used on the raw data of the training and test sets.
The steps to perform are the following:
When the fit() method is called, the Pipeline object will call, in the order specified, the fit() method of the estimator if it has one, and then its transform() method.
In [20]:
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from sklearn.tree import DecisionTreeClassifier
from sklearn.pipeline import Pipeline
# Instanciate a Pipeline
text_clf = Pipeline([('vect', CountVectorizer()),
('tfidf', TfidfTransformer()),
('clf', DecisionTreeClassifier(max_depth=10)),
])
# Transform the data and train the classifier on the training set
text_clf = text_clf.fit(X_train, y_train)
# Transform the data and perform predictions on the test set
y_test_pred = text_clf.predict(X_test)
In [23]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
# Instanciate all the Estimators and Transformers necessary
tokenizer = Tokenizer(inputCol='news', outputCol='news_words')
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='news_tf', numFeatures=10000)
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="news_tfidf")
string_indexer = StringIndexer(inputCol='target', outputCol='target_indexed')
dt = DecisionTreeClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol(), maxDepth=10)
# Instanciate a Pipeline
pipeline = Pipeline(stages=[tokenizer,
hashing_tf,
idf,
string_indexer,
dt])
# Transform the data and train the classifier on the training set
pipeline_model = pipeline.fit(df_train)
# Transform the data and perform predictions on the test set
df_test_pred = pipeline_model.transform(df_test)
In [24]:
df_test_pred.show(5)
Once we have built our pipeline, it is time to evaluate it. This is where the test set is crucial. We perform perdictions on the test set, as if we didn't know the actual classes, and then compare the predictions with the ground truth. If we do this on the training set, we would be biased because we would perform predictions on the data used to build the model. Keeping a test set whose data is not used to build the model helps in observing the generalisation capacity of the model.
Both scikit-learn and Spark ML have built-in metrics to score all kinds of predictions. In our case, we will measure the precision of the prediction: the percentage of well classified data. This metric is present in the precision_score method in scikit-learn, and in the MulticlassClassificationEvaluator object in Spark ML.
In [25]:
from sklearn.metrics import precision_score
# Evaluate the predictions done on the test set
precision_score(y_test_pred, y_test, average='micro')
Out[25]:
In [26]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Instanciate a MulticlassClassificationEvaluator with precision metric
evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='target_indexed',
metricName='precision')
# Evaluate the predictions done on the test set
evaluator.evaluate(df_test_pred)
Out[26]:
Scores are different mainly because default parameters are not the same in scikit-learn and Spark ML
We now would like to improve the score of our model. One way to do that is to tune the parameters in order to find the best combinaison of parameters.
Tuning is generally done using the following tools:
In scikit-learn, one can use the GridSearchCV object. In Spark ML, it is a CrossValidator object. In each cases, there are three things that we need to specify:
In [27]:
from sklearn.grid_search import GridSearchCV
# Create the parameters grid
parameters = {'tfidf__use_idf': (True, False),
'clf__max_depth': (10, 20)
}
# Instanciate a GridSearchCV object with the pipeline, the parameters grid and the scoring function
gs_clf = GridSearchCV(text_clf, parameters, score_func=precision_score, n_jobs=-1)
# Transform the data and train the classifier on the training set
gs_clf = gs_clf.fit(X_train, y_train)
# Transform the data and perform predictions on the test set
y_test_pred = gs_clf.predict(X_test)
# Evaluate the predictions done on the test set
precision_score(y_test_pred, y_test, average='micro')
In [28]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
# Instanciation of a ParamGridBuilder
grid = (ParamGridBuilder()
.baseOn([evaluator.metricName, 'precision'])
.addGrid(dt.maxDepth, [10, 20])
.build())
# Instanciation of a CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
# Transform the data and train the classifier on the training set
cv_model = cv.fit(df_train)
# Transform the data and perform predictions on the test set
df_test_pred = cv_model.transform(df_test)
# Evaluate the predictions done on the test set
evaluator.evaluate(df_test_pred)
Out[28]:
Again, the results are different since not all parameters are used, and the default ones may not be the same. Moreover, we did not use exactly the same objects in the Feature Engineering phase (CountVectorizer / Tokenizer for example).
As we saw, scikit-learn and Spark ML have a lot in common. There are some slightly differences between both libraries, in terms of implementation and how the data is handled, but they are minimal. Spark ML was designed to be close to scikit-learn in the way to use it, and this helps a lot when going at scale with Spark to build complex Machine Learning pipelines.
Spark ML is still under active development, and has a limited amount of algorithms implemented for now comparing to scikit-learn. The list of possibilities offered by Spark ML will expand with time, and it will be more and more easy to go from scikit-learn to Spark ML.